package server

import (
	"bytes"
	"encoding/json"
	"fmt"
	"github.com/gorilla/websocket"
	"github.com/segmentio/ksuid"
	"io/ioutil"
	"log"
	"net/http"
	"strings"
	"time"
)

const (
	// 客户端写入消息体的超时时间
	writeWait = 10 * time.Second

	// Time allowed to read the next pong message from the peer.
	pongWait = 60 * time.Second

	// Send pings to peer with this period. Must be less than pongWait.
	pingPeriod = (pongWait * 9) / 10

	// Maximum message size allowed from peer.
	// 设置最大读取字节数
	maxMessageSize = 1024 * 2
)

var (
	newline = []byte{'\n'}
	space   = []byte{' '}
)

type Msg struct {
	Type      string `json:"type"`
	Group     string `json:"group"`
	BindingID string `json:"binding_id"`
	Content   string `json:"content"`
}

var upgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}

type Client struct {
	hub       *Hub
	dispense  *Dispense
	uuid      string
	bindingID string
	group     string
	send      chan []byte
	conn      *websocket.Conn
}

//检查push的权限
func checkAuthorization(request *http.Request, secretManager *SecretManager) bool {
	check := false
	if len(secretManager.secret) > 0 {
		auth := request.Header.Get("Authorization")
		for _, v := range secretManager.secret {
			fmt.Println(auth, "==", v, v == auth)
			if v == auth {
				check = true
				break
			}
		}
	}
	return check
}

func ServeWs(hub *Hub, dispense *Dispense, writer http.ResponseWriter, request *http.Request) {
	group := request.URL.Query().Get("group")
	bindingID := request.URL.Query().Get("bindingID")

	if len(group) < 1 {
		writer.WriteHeader(500)
		writer.Write([]byte("请传入group"))
		return
	}

	conn, err := upgrader.Upgrade(writer, request, nil)
	if err != nil {
		log.Println(err)
		return
	}
	client := &Client{
		hub:       hub,
		dispense:  dispense,
		uuid:      ksuid.New().String(),
		group:     group,
		bindingID: bindingID,
		send:      make(chan []byte, 256),
		conn:      conn,
	}
	hub.registered <- client
	go client.writeListener()
	go client.readListener()
}

func GroupPush(hub *Hub, secretManager *SecretManager, writer http.ResponseWriter, request *http.Request) {
	if !checkAuthorization(request, secretManager) {
		writer.WriteHeader(401)
		writer.Write([]byte("未认证或认证失败"))
		return
	}
	defer request.Body.Close()
	var body []byte
	body, err := ioutil.ReadAll(request.Body)
	if err != nil {
		fmt.Printf("read body err, %v\n", err)
		return
	}

	body = bytes.TrimSpace(bytes.Replace(body, newline, space, -1))

	message := new(Msg)

	if err := json.Unmarshal(body, message); err != nil {
		writer.WriteHeader(500)
		writer.Write([]byte("解析数据失败"))
		return
	}

	if len(fmt.Sprint(message.Content)) < 1 {
		writer.WriteHeader(500)
		writer.Write([]byte("消息体不能为空"))
		return
	}

	// 广播
	if strings.EqualFold(message.Type, "BROADCAST") == true {
		body, _ = json.Marshal(message)
		select {
		case hub.broadcast <- body:
			writer.Write(body)
		default:
			writer.WriteHeader(500)
			writer.Write([]byte("广播推送失败"))
		}
		return
	}

	// 分组
	if strings.EqualFold(message.Type, "GROUP") == true {
		if len(message.Group) < 1 {
			writer.WriteHeader(500)
			writer.Write([]byte("推送目标不能为空"))
			return
		}
		select {
		case hub.targetpush <- message:
			writer.Write(body)
		default:
			writer.WriteHeader(500)
			writer.Write([]byte("推送目标失败"))
		}
		return
	}
	writer.WriteHeader(500)
	writer.Write([]byte("消息类型错误"))
}

func Secret(secretManager *SecretManager, writer http.ResponseWriter, request *http.Request) {
	auth := request.Header.Get("Authorization")
	if !secretManager.isAdmin(auth) {
		writer.WriteHeader(401)
		writer.Write([]byte("无操作权限"))
		return
	}

	secret := request.FormValue("secret")
	if len(secret) < 1 {
		writer.WriteHeader(400)
		writer.Write([]byte("secret is nil"))
		return
	}

	var msg string
	msg = fmt.Sprintf("处理完成，可能延时生效,%s", secret)
	if strings.EqualFold(request.Method, "POST") {
		select {
		case secretManager.add <- []byte(secret):
		default:
			writer.WriteHeader(500)
			msg = fmt.Sprintf("POST失败，%s", secret)
			writer.Write([]byte(msg))
			return
		}
	}

	if strings.EqualFold(request.Method, "DELETE") {
		select {
		case secretManager.del <- []byte(secret):
		default:
			writer.WriteHeader(500)
			msg = fmt.Sprintf("DELETE失败，%s", secret)
			writer.Write([]byte(msg))
			return
		}
	}

	writer.WriteHeader(200)
	writer.Write([]byte(msg))
	return
}

func (client *Client) writeListener() {
	// 一个定时器，定时ping客户端
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		client.conn.Close()
	}()

	for {
		select {
		case <-ticker.C:
			// 设置写入超时时间
			client.conn.SetWriteDeadline(time.Now().Add(writeWait))
			// 写入一个ping消息用于验证与客户端是否断开
			if err := client.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				// 断开了,发送一个解绑消息
				client.hub.unregistered <- client
				return
			}
		case message, ok := <-client.send:
			// 设置写入超时时间
			client.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if ok == false {
				// close(client.send) 只会在hub中触发
				// 如果send已经关闭，则写入一个CloseMessage消息
				client.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}
			// 获取写入器
			w, err := client.conn.NextWriter(websocket.TextMessage)

			if err != nil {
				return
			}

			messageBytes := []byte(message)
			w.Write(messageBytes)
			if err := w.Close(); err != nil {
				return
			}

			// 判断是否需要把接收的消息，通知到指定回调地址
			if client.dispense != nil && len(client.dispense.receiveUrl) > 0 {
				client.dispense.send <- messageBytes
			}
		}
	}

}

func (client *Client) readListener() {
	defer func() {
		client.hub.unregistered <- client
		client.conn.Close()
	}()
	// 设置最大读取字节数
	client.conn.SetReadLimit(maxMessageSize)
	// 设置pong的回调处理
	client.conn.SetPongHandler(func(string) error { client.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })

	for {
		// 设置读取超时时间
		client.conn.SetReadDeadline(time.Now().Add(pongWait))

		// 读取消息，会阻塞
		_, message, err := client.conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				log.Printf("[wsevent] ReadMessage error: %v", err)
			}
			break
		}
		// 去换行与两端空格
		// {"type":"message","content":"xxxx"}
		message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))

		msg := new(Msg)
		// json字符串 转 struct
		err = json.Unmarshal(message, msg)

		// 解析失败，发送一条错误消息通知客户端
		if err != nil {
			msg = &Msg{Type: "ERROR", Content: fmt.Sprintf("数据解析失败:%s", message)}
			errMsg, _ := json.Marshal(msg)
			client.send <- errMsg
			continue
		}
		// 检查消息类型
		if !strings.EqualFold(msg.Type, "message") {
			msg = &Msg{Type: "ERROR", Content: fmt.Sprintf("发送数据type错误【%s】", msg.Type)}
			errMsg, _ := json.Marshal(msg)
			client.send <- errMsg
			continue
		}

		msg.BindingID = client.bindingID
		msg.Group = client.group
		// 构建消息传送的消息json字符串格式
		message, _ = json.Marshal(msg)

		// 发送消息到指定分组的client
		for _, c := range client.hub.GetClientsByGroup(client.group) {
			// 自己就不用发送通知消息了
			if client.uuid == c.uuid {
				continue
			}
			select {
			case c.send <- message:
			default:
				// 发送失败，就unregistered,解绑
				c.hub.unregistered <- c
			}
		}
	}

}
